package com.atuguigu.flink.Alert;

import com.atuguigu.flink.sensor.SendsorReading;
import com.atuguigu.flink.sensor.SensorSource;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;

public class MultiStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //第一条数据流
        DataStreamSource<SendsorReading> sensorStream = env.addSource(new SensorSource());
        //第二条数据流
        DataStreamSource<SmokeLevel> somkeStream = env.addSource(new SomkeLevelSource()).setParallelism(1);

        SingleOutputStreamOperator<Alter> result = sensorStream.keyBy(
                //将温度传感器个烟雾传感器进行联合处理
                new KeySelector<SendsorReading, String>() {

                    @Override
                    public String getKey(SendsorReading sendsorReading) throws Exception {
                        return sendsorReading.id;
                    }
                }
        ).connect(somkeStream.broadcast()).flatMap(new RaiseAlert());

        result.print();


        env.execute();
    }


    public static class RaiseAlert implements CoFlatMapFunction<SendsorReading,SmokeLevel,Alter> {
        //储存当前传感器的读数

        SmokeLevel smokeLevel=SmokeLevel.LOW;


        @Override
        public void flatMap1(SendsorReading sendsorReading, Collector<Alter> collector) throws Exception {
            //处理第一条流的数据
            if(this.smokeLevel==SmokeLevel.HIGH && sendsorReading.temperture>100.0){
                collector.collect(new Alter("第"+sendsorReading.id+"片森林着火了.",sendsorReading.timestamp));
            }

        }

        @Override
        public void flatMap2(SmokeLevel smokeLevel, Collector<Alter> collector) throws Exception {

            this.smokeLevel=smokeLevel;
        }
    }
}
